18.8.3 消息重复消费与幂等性处理

1. 消息重复消费问题分析

1.1 重复消费产生原因

public class MessageDuplicationAnalysis {
    
    /*
     * 消息重复消费产生原因:
     * 
     * 1. 网络异常
     *    - 消费者处理完消息后,ACK确认丢失
     *    - 网络抖动导致消息重新投递
     *    - 消费者重启后重新消费未确认消息
     * 
     * 2. 消费者异常
     *    - 消费者处理过程中崩溃
     *    - 消费者处理超时
     *    - 消费者重启或扩容
     * 
     * 3. 消息队列机制
     *    - At Least Once语义保证
     *    - 消息重试机制
     *    - 分区重平衡
     * 
     * 4. 业务逻辑问题
     *    - 消费逻辑执行时间过长
     *    - 消费者并发处理相同消息
     *    - 事务回滚导致消息重新消费
     */
    
    public void demonstrateMessageDuplication() {
        System.out.println("=== 消息重复消费场景演示 ===");
        
        MockMessageQueue messageQueue = new MockMessageQueue();
        
        demonstrateNetworkFailure(messageQueue);
        demonstrateConsumerCrash(messageQueue);
        demonstrateTimeoutScenario(messageQueue);
        demonstrateConcurrentConsumption(messageQueue);
    }
    
    private void demonstrateNetworkFailure(MockMessageQueue messageQueue) {
        System.out.println("--- 网络异常导致重复消费 ---");
        
        MockConsumer consumer = new MockConsumer("consumer-1");
        
        System.out.println("1. 消费者处理消息:");
        Message message = new Message("msg-001", "订单创建", "order:12345");
        
        // 消费者处理消息
        boolean processed = consumer.processMessage(message);
        System.out.println("消息处理结果: " + processed);
        
        System.out.println("\n2. ACK确认失败 (网络异常):");
        boolean ackSuccess = messageQueue.acknowledgeMessage(message.getId(), false); // 模拟ACK失败
        System.out.println("ACK确认结果: " + ackSuccess);
        
        System.out.println("\n3. 消息重新投递:");
        if (!ackSuccess) {
            System.out.println("由于ACK失败,消息将重新投递");
            boolean reprocessed = consumer.processMessage(message);
            System.out.println("重复处理结果: " + reprocessed);
        }
        
        System.out.println("\n4. 风险分析:");
        System.out.println("   - 订单可能被重复创建");
        System.out.println("   - 库存可能被重复扣减");
        System.out.println("   - 用户可能收到重复通知");
    }
    
    private void demonstrateConsumerCrash(MockMessageQueue messageQueue) {
        System.out.println("\n--- 消费者崩溃导致重复消费 ---");
        
        MockConsumer consumer = new MockConsumer("consumer-2");
        
        System.out.println("1. 消费者开始处理消息:");
        Message message = new Message("msg-002", "支付处理", "payment:67890");
        
        try {
            // 模拟消费者在处理过程中崩溃
            consumer.processMessageWithCrash(message);
        } catch (RuntimeException e) {
            System.out.println("消费者崩溃: " + e.getMessage());
        }
        
        System.out.println("\n2. 消费者重启后重新消费:");
        MockConsumer newConsumer = new MockConsumer("consumer-2-restart");
        boolean reprocessed = newConsumer.processMessage(message);
        System.out.println("重启后处理结果: " + reprocessed);
        
        System.out.println("\n3. 问题影响:");
        System.out.println("   - 支付可能被重复处理");
        System.out.println("   - 资金可能被重复扣除");
        System.out.println("   - 需要幂等性保证");
    }
    
    private void demonstrateTimeoutScenario(MockMessageQueue messageQueue) {
        System.out.println("\n--- 处理超时导致重复消费 ---");
        
        MockConsumer slowConsumer = new MockConsumer("slow-consumer");
        
        System.out.println("1. 消费者处理耗时消息:");
        Message message = new Message("msg-003", "数据同步", "sync:data:large");
        
        // 模拟处理超时
        boolean processed = slowConsumer.processSlowMessage(message, 5000); // 5秒处理时间
        System.out.println("消息处理结果: " + processed);
        
        System.out.println("\n2. 消息队列超时重投:");
        if (messageQueue.isMessageTimeout(message.getId(), 3000)) { // 3秒超时
            System.out.println("消息处理超时,重新投递给其他消费者");
            
            MockConsumer fastConsumer = new MockConsumer("fast-consumer");
            boolean reprocessed = fastConsumer.processMessage(message);
            System.out.println("其他消费者处理结果: " + reprocessed);
        }
        
        System.out.println("\n3. 超时处理策略:");
        System.out.println("   - 设置合理的消息超时时间");
        System.out.println("   - 优化消费者处理逻辑");
        System.out.println("   - 实现幂等性处理");
    }
    
    private void demonstrateConcurrentConsumption(MockMessageQueue messageQueue) {
        System.out.println("\n--- 并发消费导致重复处理 ---");
        
        Message message = new Message("msg-004", "库存扣减", "inventory:reduce:100");
        
        System.out.println("1. 多个消费者同时处理相同消息:");
        
        // 启动多个消费者线程
        for (int i = 1; i <= 3; i++) {
            final int consumerId = i;
            new Thread(() -> {
                MockConsumer consumer = new MockConsumer("concurrent-consumer-" + consumerId);
                boolean processed = consumer.processMessage(message);
                System.out.println("消费者" + consumerId + "处理结果: " + processed);
            }).start();
        }
        
        try {
            Thread.sleep(1000); // 等待并发处理完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("\n2. 并发问题分析:");
        System.out.println("   - 相同消息被多个消费者处理");
        System.out.println("   - 可能导致数据不一致");
        System.out.println("   - 需要分布式锁或幂等性控制");
    }
}

// 消息实体
class Message {
    private String id;
    private String type;
    private String payload;
    private long timestamp;
    
    public Message(String id, String type, String payload) {
        this.id = id;
        this.type = type;
        this.payload = payload;
        this.timestamp = System.currentTimeMillis();
    }
    
    public String getId() { return id; }
    public String getType() { return type; }
    public String getPayload() { return payload; }
    public long getTimestamp() { return timestamp; }
    
    @Override
    public String toString() {
        return String.format("Message{id='%s', type='%s', payload='%s'}", id, type, payload);
    }
}

// 模拟消息队列
class MockMessageQueue {
    private java.util.Map<String, Long> messageTimestamps = new java.util.concurrent.ConcurrentHashMap<>();
    
    public boolean acknowledgeMessage(String messageId, boolean success) {
        if (success) {
            messageTimestamps.remove(messageId);
            System.out.println("    消息ACK成功: " + messageId);
            return true;
        } else {
            System.out.println("    消息ACK失败: " + messageId);
            return false;
        }
    }
    
    public boolean isMessageTimeout(String messageId, long timeoutMs) {
        Long timestamp = messageTimestamps.get(messageId);
        if (timestamp != null) {
            return System.currentTimeMillis() - timestamp > timeoutMs;
        }
        return false;
    }
}

// 模拟消费者
class MockConsumer {
    private String consumerId;
    private java.util.Set<String> processedMessages = new java.util.concurrent.ConcurrentHashMap<String, Boolean>().keySet(java.util.concurrent.ConcurrentHashMap.newKeySet());
    
    public MockConsumer(String consumerId) {
        this.consumerId = consumerId;
    }
    
    public boolean processMessage(Message message) {
        System.out.println("  " + consumerId + " 开始处理消息: " + message);
        
        // 模拟业务处理
        try {
            Thread.sleep(100); // 模拟处理时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 记录已处理消息
        processedMessages.add(message.getId());
        
        System.out.println("  " + consumerId + " 处理完成: " + message.getId());
        return true;
    }
    
    public boolean processMessageWithCrash(Message message) {
        System.out.println("  " + consumerId + " 开始处理消息: " + message);
        
        // 模拟处理过程中崩溃
        try {
            Thread.sleep(50);
            throw new RuntimeException("消费者崩溃");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        return false;
    }
    
    public boolean processSlowMessage(Message message, long processTimeMs) {
        System.out.println("  " + consumerId + " 开始处理耗时消息: " + message);
        
        try {
            Thread.sleep(processTimeMs);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("  " + consumerId + " 耗时处理完成: " + message.getId());
        return true;
    }
}

2. 幂等性设计与实现

2.1 幂等性实现方案

public class IdempotencyImplementation {
    
    /*
     * 幂等性实现方案:
     * 
     * 1. 唯一键去重
     *    - 数据库唯一约束
     *    - Redis Set去重
     *    - 业务唯一标识
     * 
     * 2. 状态机控制
     *    - 订单状态流转
     *    - 支付状态控制
     *    - 业务状态检查
     * 
     * 3. 分布式锁
     *  

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java面试圣经 文章被收录于专栏

Java面试圣经,带你练透java圣经

全部评论

相关推荐

大名鼎鼎楚雨荨:我寻思这不才刚二面?
秋招的第一个offer,...
点赞 评论 收藏
分享
_追梦旅人_:同学考虑深圳睿联不,我们正在秋招,可在我主页看岗位,感兴趣可直接投递~
投递美团等公司10个岗位
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务