18.8.3 消息重复消费与幂等性处理
1. 消息重复消费问题分析
1.1 重复消费产生原因
public class MessageDuplicationAnalysis { /* * 消息重复消费产生原因: * * 1. 网络异常 * - 消费者处理完消息后,ACK确认丢失 * - 网络抖动导致消息重新投递 * - 消费者重启后重新消费未确认消息 * * 2. 消费者异常 * - 消费者处理过程中崩溃 * - 消费者处理超时 * - 消费者重启或扩容 * * 3. 消息队列机制 * - At Least Once语义保证 * - 消息重试机制 * - 分区重平衡 * * 4. 业务逻辑问题 * - 消费逻辑执行时间过长 * - 消费者并发处理相同消息 * - 事务回滚导致消息重新消费 */ public void demonstrateMessageDuplication() { System.out.println("=== 消息重复消费场景演示 ==="); MockMessageQueue messageQueue = new MockMessageQueue(); demonstrateNetworkFailure(messageQueue); demonstrateConsumerCrash(messageQueue); demonstrateTimeoutScenario(messageQueue); demonstrateConcurrentConsumption(messageQueue); } private void demonstrateNetworkFailure(MockMessageQueue messageQueue) { System.out.println("--- 网络异常导致重复消费 ---"); MockConsumer consumer = new MockConsumer("consumer-1"); System.out.println("1. 消费者处理消息:"); Message message = new Message("msg-001", "订单创建", "order:12345"); // 消费者处理消息 boolean processed = consumer.processMessage(message); System.out.println("消息处理结果: " + processed); System.out.println("\n2. ACK确认失败 (网络异常):"); boolean ackSuccess = messageQueue.acknowledgeMessage(message.getId(), false); // 模拟ACK失败 System.out.println("ACK确认结果: " + ackSuccess); System.out.println("\n3. 消息重新投递:"); if (!ackSuccess) { System.out.println("由于ACK失败,消息将重新投递"); boolean reprocessed = consumer.processMessage(message); System.out.println("重复处理结果: " + reprocessed); } System.out.println("\n4. 风险分析:"); System.out.println(" - 订单可能被重复创建"); System.out.println(" - 库存可能被重复扣减"); System.out.println(" - 用户可能收到重复通知"); } private void demonstrateConsumerCrash(MockMessageQueue messageQueue) { System.out.println("\n--- 消费者崩溃导致重复消费 ---"); MockConsumer consumer = new MockConsumer("consumer-2"); System.out.println("1. 消费者开始处理消息:"); Message message = new Message("msg-002", "支付处理", "payment:67890"); try { // 模拟消费者在处理过程中崩溃 consumer.processMessageWithCrash(message); } catch (RuntimeException e) { System.out.println("消费者崩溃: " + e.getMessage()); } System.out.println("\n2. 消费者重启后重新消费:"); MockConsumer newConsumer = new MockConsumer("consumer-2-restart"); boolean reprocessed = newConsumer.processMessage(message); System.out.println("重启后处理结果: " + reprocessed); System.out.println("\n3. 问题影响:"); System.out.println(" - 支付可能被重复处理"); System.out.println(" - 资金可能被重复扣除"); System.out.println(" - 需要幂等性保证"); } private void demonstrateTimeoutScenario(MockMessageQueue messageQueue) { System.out.println("\n--- 处理超时导致重复消费 ---"); MockConsumer slowConsumer = new MockConsumer("slow-consumer"); System.out.println("1. 消费者处理耗时消息:"); Message message = new Message("msg-003", "数据同步", "sync:data:large"); // 模拟处理超时 boolean processed = slowConsumer.processSlowMessage(message, 5000); // 5秒处理时间 System.out.println("消息处理结果: " + processed); System.out.println("\n2. 消息队列超时重投:"); if (messageQueue.isMessageTimeout(message.getId(), 3000)) { // 3秒超时 System.out.println("消息处理超时,重新投递给其他消费者"); MockConsumer fastConsumer = new MockConsumer("fast-consumer"); boolean reprocessed = fastConsumer.processMessage(message); System.out.println("其他消费者处理结果: " + reprocessed); } System.out.println("\n3. 超时处理策略:"); System.out.println(" - 设置合理的消息超时时间"); System.out.println(" - 优化消费者处理逻辑"); System.out.println(" - 实现幂等性处理"); } private void demonstrateConcurrentConsumption(MockMessageQueue messageQueue) { System.out.println("\n--- 并发消费导致重复处理 ---"); Message message = new Message("msg-004", "库存扣减", "inventory:reduce:100"); System.out.println("1. 多个消费者同时处理相同消息:"); // 启动多个消费者线程 for (int i = 1; i <= 3; i++) { final int consumerId = i; new Thread(() -> { MockConsumer consumer = new MockConsumer("concurrent-consumer-" + consumerId); boolean processed = consumer.processMessage(message); System.out.println("消费者" + consumerId + "处理结果: " + processed); }).start(); } try { Thread.sleep(1000); // 等待并发处理完成 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("\n2. 并发问题分析:"); System.out.println(" - 相同消息被多个消费者处理"); System.out.println(" - 可能导致数据不一致"); System.out.println(" - 需要分布式锁或幂等性控制"); } } // 消息实体 class Message { private String id; private String type; private String payload; private long timestamp; public Message(String id, String type, String payload) { this.id = id; this.type = type; this.payload = payload; this.timestamp = System.currentTimeMillis(); } public String getId() { return id; } public String getType() { return type; } public String getPayload() { return payload; } public long getTimestamp() { return timestamp; } @Override public String toString() { return String.format("Message{id='%s', type='%s', payload='%s'}", id, type, payload); } } // 模拟消息队列 class MockMessageQueue { private java.util.Map<String, Long> messageTimestamps = new java.util.concurrent.ConcurrentHashMap<>(); public boolean acknowledgeMessage(String messageId, boolean success) { if (success) { messageTimestamps.remove(messageId); System.out.println(" 消息ACK成功: " + messageId); return true; } else { System.out.println(" 消息ACK失败: " + messageId); return false; } } public boolean isMessageTimeout(String messageId, long timeoutMs) { Long timestamp = messageTimestamps.get(messageId); if (timestamp != null) { return System.currentTimeMillis() - timestamp > timeoutMs; } return false; } } // 模拟消费者 class MockConsumer { private String consumerId; private java.util.Set<String> processedMessages = new java.util.concurrent.ConcurrentHashMap<String, Boolean>().keySet(java.util.concurrent.ConcurrentHashMap.newKeySet()); public MockConsumer(String consumerId) { this.consumerId = consumerId; } public boolean processMessage(Message message) { System.out.println(" " + consumerId + " 开始处理消息: " + message); // 模拟业务处理 try { Thread.sleep(100); // 模拟处理时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 记录已处理消息 processedMessages.add(message.getId()); System.out.println(" " + consumerId + " 处理完成: " + message.getId()); return true; } public boolean processMessageWithCrash(Message message) { System.out.println(" " + consumerId + " 开始处理消息: " + message); // 模拟处理过程中崩溃 try { Thread.sleep(50); throw new RuntimeException("消费者崩溃"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return false; } public boolean processSlowMessage(Message message, long processTimeMs) { System.out.println(" " + consumerId + " 开始处理耗时消息: " + message); try { Thread.sleep(processTimeMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(" " + consumerId + " 耗时处理完成: " + message.getId()); return true; } }
2. 幂等性设计与实现
2.1 幂等性实现方案
public class IdempotencyImplementation { /* * 幂等性实现方案: * * 1. 唯一键去重 * - 数据库唯一约束 * - Redis Set去重 * - 业务唯一标识 * * 2. 状态机控制 * - 订单状态流转 * - 支付状态控制 * - 业务状态检查 * * 3. 分布式锁 *
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经